﻿/**
* CRL
*/
using CRL.Core;
using CRL.Core.Extension;
using CRL.Core.Remoting;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace CRL.EventBus.Queue
{
    delegate void CallbackEventHandler(object sender, EventDeclare ed, object data, IData reqData);
    public abstract class AbsQueue : IDisposable
    {
        public string Name { get; set; }
        public abstract MQType MQType { get; }
        public AbsQueue()
        {
            //Name = Guid.NewGuid().ToString();
        }

        public abstract void Dispose();

        public abstract void PublishList(string routingKey, IEnumerable<object> msgs, Action<PublishOption> optFunc = null);
        public abstract Task PublishListAsync(string routingKey, IEnumerable<object> msgs, Action<PublishOption> optFunc = null);
        public virtual void ConfirmConsume()
        {

        }
        public abstract void RePublish(IData data, object msg);

        public abstract void Subscribe(EventDeclare eventDeclare);
        public abstract void SubscribeAsync(EventDeclare eventDeclare);
        public virtual bool DeleteMsg(string msgKey)
        {
            throw new NotImplementedException();
        }
        public virtual bool ConsumeMsg(string msgKey)
        {
            throw new NotImplementedException();
        }
        public virtual long CleanQueue(string routingKey)
        {
            return 0;
        }
        public virtual long GetQueueLength(string routingKey)
        {
            return 0;
        }
        public virtual void ConsumeMsg<T>(string routingKey, int take, Action<List<T>> action, string msgKey = "")
        {
            throw new NotImplementedException();
        }
        protected void Log(EventDeclare ed, string msg, string logName = "")
        {
            if (string.IsNullOrEmpty(logName))
            {
                logName = ed.Name;
            }
            EventLog.Log($"{ed.Name} {msg}", logName);
            //if (System.Diagnostics.Debugger.IsAttached)
            //{
            //    Console.WriteLine($"{ed.Name} {msg}");
            //}
        }
        //internal event CallbackEventHandler onSubCallback;
        #region inner
        static Dictionary<Type, AsyncResult> asyncResultObjCache = new Dictionary<Type, AsyncResult>();
        public class objCheck
        {
            public bool invokeSuccess;
            public object finalResult = false;
            public bool isBoolResult;
        }
        protected async Task<object> getInvokeResult(EventDeclare ed, object instance, object obj)
        {
            var result = ed.MethodInvoke.Invoke(instance, new object[] { obj });
            if (result is Task)
            {
                var task = result as Task;
                var argsType = result.GetType().GetGenericArguments().FirstOrDefault();
                if (argsType.Name == "VoidTaskResult")// void task
                {
                    await task;
                    return null;
                }
                else//task<T>
                {
                    var a = asyncResultObjCache.TryGetValue(argsType, out var asynObj);
                    if (!a)
                    {
                        var taskType = typeof(AsyncResult<>).MakeGenericType(argsType);
                        var taskCreater = DynamicMethodHelper.CreateCtorFunc<Func<AsyncResult>>(taskType, new Type[0]);
                        asynObj = taskCreater.Invoke();
                        asyncResultObjCache.Add(argsType, asynObj);
                    }
                    return await asynObj.GetResult2(task);
                }
            }
            else
            {
                return result;
            }
        }
        protected objCheck checkInvokeSuccess(object result)
        {
            var success = true;
            if (result != null && result is Task)
            {
                throw new Exception("先调用getInvokeResult");
            }
            var isBoolResult = false;
            //按bool值
            if (result != null && result is bool)
            {
                success = (bool)result;
                isBoolResult = true;
            }
            return new objCheck
            {
                invokeSuccess = success,
                finalResult = result,
                isBoolResult = isBoolResult
            };
        }
        /// <summary>
        /// 重发和回调
        /// </summary>
        /// <param name="objCheck"></param>
        /// <param name="data"></param>
        /// <param name="ed"></param>
        /// <param name="msg"></param>
        protected void CheckResult(objCheck objCheck, IData data, EventDeclare ed, object msg)
        {
            //返回false或未成功的task需要重发
            //最终结果值非bool需要回调
            //var invokeSuccess = checkInvokeSuccess(result);
            #region 回调 
            //最终结果值非BOOL
            if (objCheck.invokeSuccess && objCheck.finalResult != null && !(objCheck.isBoolResult))
            {
                if (!ed.Name.EndsWith("_callBack"))
                {
                    PublishList($"{ed.Name}_callBack", new List<object> { objCheck.finalResult });
                    //Console.WriteLine($"发送回调 {ed.Name} {result}");
                    var model = callbackStatus.Get(ed.Name);
                    model?.invoke(this, ed, objCheck.finalResult, data);
                }
                return;
            }
            #endregion
            if (MQType == MQType.RabbitMQ && !objCheck.isBoolResult)
            {
                ////异常时，rabbitmq内部实现重发
                //return;
            }
            //var isDbQueue = GetType() == typeof(DbQueue) || GetType() == typeof(MongoDb);
            var isDbQueue = true;
            var attr = ed.SubscribeAttribute;
            //db类默认无限重试
            if ((attr.RetryTimes > 0 || isDbQueue) && !objCheck.invokeSuccess)
            {
                var retry = data.RetryTimes < attr.RetryTimes;
                if (isDbQueue)
                {
                    retry = attr.RetryTimes == 0 || data.RetryTimes < attr.RetryTimes;
                }
                if (retry)
                {
                    RePublish(data, msg);
                }
                else
                {
                    DeleteData(data);
                }
            }
        }
        public virtual void DeleteData(IData data)
        {

        }

        async Task<object> handlerMsgAsync(string msg, EventDeclare ed)
        {
            var instance = ed.CreateServiceInstance();
            if (!ed.IsArray)//订阅单个
            {
                object result = null;
                if (instance == null && !ed.FromManual)
                {
                    throw new Exception("ed.CreateServiceInstance is null");
                }
                var classType = typeof(List<>);
                var msgPackage = msg.ToObject<MsgPackage>();
                if (msgPackage._InnerData?.ToString().StartsWith("[") == true)//传入了数组
                {
                    var constructedType = classType.MakeGenericType(ed.EventDataType);
                    var list = msgPackage.GetData(constructedType) as IEnumerable;
                    var enumerator = list.GetEnumerator();
                    while (enumerator.MoveNext())
                    {
                        try
                        {
                            //result = ed.MethodInvoke.Invoke(instance, new object[] { enumerator.Current });
                            result = await getInvokeResult(ed, instance, enumerator.Current);
                            var objCheck = checkInvokeSuccess(result);
                            CheckResult(objCheck, msgPackage, ed, enumerator.Current);
                        }
                        catch (Exception ero)
                        {
                            //if (GetType() != typeof(RabbitMQ))
                            //{
                            CheckResult(new objCheck(), msgPackage, ed, enumerator.Current);
                            //}
                            Log(ed, ero.ToString());
                            //throw ero;
                        }

                    }
                }
                else
                {
                    //var obj = msgPackage.GetData(ed.EventDataType);
                    object obj;
                    try
                    {
                        obj = msgPackage.GetData(ed.EventDataType);
                    }
                    catch (Exception ex)
                    {
                        throw new Exception($"发布和订阅类型冲突 {ex.Message} subName:{ed.Name} {ed.EventDataType}=>{msgPackage._InnerData}");
                    }
                    try
                    {
                        //result = ed.MethodInvoke.Invoke(instance, new object[] { obj });
                        result = await getInvokeResult(ed, instance, obj);
                        var objCheck = checkInvokeSuccess(result);
                        CheckResult(objCheck, msgPackage, ed, obj);
                    }
                    catch (Exception ero)
                    {
                        //if (GetType() != typeof(RabbitMQ))
                        //{
                        CheckResult(new objCheck(), msgPackage, ed, obj);
                        //}
                        Log(ed, ero.ToString());
                        //throw ero;
                    }
                }
                return result;
            }
            else//订阅集合
            {
                var msgPackage = msg.ToObject<MsgPackage>();
                //var obj = msgPackage.GetData(ed.EventDataType);
                object obj;
                try
                {
                    obj = msgPackage.GetData(ed.EventDataType);
                }
                catch (Exception ex)
                {
                    throw new Exception($"发布和订阅类型冲突 {ex.Message} subName:{ed.Name} {ed.EventDataType}=>{msgPackage._InnerData}");
                }
                //var instance = ed.CreateServiceInstance();
                if (instance == null)
                {
                    throw new Exception("ed.CreateServiceInstance is null");
                }
                object result = true;
                try
                {
                    //result = ed.MethodInvoke.Invoke(instance, new object[] { obj });
                    result = await getInvokeResult(ed, instance, obj);
                    var objCheck = checkInvokeSuccess(result);
                    CheckResult(objCheck, msgPackage, ed, obj);
                }
                catch (Exception ero)
                {
                    CheckResult(new objCheck(), msgPackage, ed, obj);
                    Log(ed, ero.ToString());
                    //throw ero;//总是抛出异常以内部处理
                }
                return result;
            }
        }
        protected internal void OnReceiveString(string msg, string key)
        {
            var ed = SubscribeService.GetEventDeclare(key);
            if (ed == null)
            {
                return;
            }
            handlerMsgAsync(msg, ed).Wait();
        }
        protected internal async Task OnReceiveAsync(string msg, string key)
        {
            var ed = SubscribeService.GetEventDeclare(key);
            if (ed == null)
            {
                return;
            }
            await handlerMsgAsync(msg, ed);
        }
        #endregion
    }
}
